package com.uyuni.rpc.client.provider;

import com.uyuni.rpc.client.metrics.ServiceMeterManager;
import com.uyuni.rpc.client.provider.DefaultServiceProviderContainer.CurrentServiceState;
import com.uyuni.rpc.client.provider.flow.control.ServiceFlowControllerManager;
import com.uyuni.rpc.client.provider.model.ServiceWrapper;
import com.uyuni.rpc.common.protocol.UyuniProtocol;
import com.uyuni.rpc.common.transport.body.RequestCustomBody;
import com.uyuni.rpc.common.transport.body.ResponseCustomBody;
import com.uyuni.rpc.common.transport.body.ResponseCustomBody.ResultWrapper;
import com.uyuni.rpc.common.utils.Pair;
import com.uyuni.rpc.common.utils.Status;
import com.uyuni.rpc.common.utils.SystemClock;
import com.uyuni.rpc.transport.model.RemotingTransporter;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static com.uyuni.rpc.common.serialization.SerializerHolder.serializerImpl;
import static com.uyuni.rpc.common.utils.Reflects.fastInvoke;
import static com.uyuni.rpc.common.utils.Reflects.findMatchingParameterTypes;
import static com.uyuni.rpc.common.utils.Status.*;

/**
 * @author BazingaLyn
 * @description 处理consumer rpc请求的核心控制器，并统计处理的次数
 * @time 2016年8月30日
 * @modifytime
 */
public class ProviderRPCController {

    private static final Logger logger = LoggerFactory.getLogger(ProviderRPCController.class);

    private DefaultProvider defaultProvider;

    public ProviderRPCController(DefaultProvider defaultProvider) {
        this.defaultProvider = defaultProvider;
    }

    public void handlerRPCRequest(RemotingTransporter request, Channel channel) {
        String serviceName = "";
        RequestCustomBody body;
        int requestSize;
        try {
            byte[] bytes = request.bytes();
            requestSize = bytes.length;
            request.bytes(null);

            body = serializerImpl().readObject(bytes, RequestCustomBody.class);

            request.setCustomBody(body);
            serviceName = body.getServiceName();
            //TODO 调用统计
            ServiceMeterManager.incrementRequestSize(serviceName, requestSize);
            ServiceMeterManager.incrementCallTimes(serviceName);
            ServiceFlowControllerManager serviceFlowControllerManager = defaultProvider.getProviderRegistryController().getServiceFlowControllerManager();
            serviceFlowControllerManager.incrementCallCount(serviceName);

        } catch (Exception e) {
            rejected(BAD_REQUEST, channel, request, serviceName);
            return;
        }

        final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderRegistryController().getProviderContainer().lookupService(serviceName);
        if (pair == null || pair.getValue() == null) {
            rejected(SERVICE_NOT_FOUND, channel, request, serviceName);
            return;
        }

        // app flow control 限流
        if (pair.getValue().isFlowController()) {
            ServiceFlowControllerManager serviceFlowControllerManager = defaultProvider.getProviderRegistryController().getServiceFlowControllerManager();
            if (!serviceFlowControllerManager.isAllow(serviceName)) {
                rejected(APP_FLOW_CONTROL, channel, request, serviceName);
                return;
            }
        }
        //处理请求
        process(pair, request, channel, serviceName, body.getTimestamp());

    }

    /**
     * RPC的核心处理
     *
     * @param pair
     * @param request
     * @param channel
     * @param serviceName
     * @param beginTime
     */
    private void process(Pair<CurrentServiceState, ServiceWrapper> pair, final RemotingTransporter request, Channel channel, final String serviceName, final long beginTime) {

        // 服务状态（是否降级，限流，自动降级等...）
        CurrentServiceState currentServiceState = pair.getKey();
        // 服务编制实例
        ServiceWrapper serviceWrapper = pair.getValue();
        // 服务原生提供类
        Object targetCallObj = serviceWrapper.getServiceProvider();
        // 方法参数
        Object[] args = ((RequestCustomBody) request.getCustomBody()).getArgs();

        //判断服务是否已经被设定为自动降级，如果被设置为自动降级且有它自己的mock类的话，则将targetCallObj切换到mock方法上来
        if (currentServiceState.getHasDegrade().get() && serviceWrapper.getMockDegradeServiceProvider() != null) {
            targetCallObj = serviceWrapper.getMockDegradeServiceProvider();
        }
        // 方法名
        String methodName = serviceWrapper.getMethodName();
        // 方法参数类型
        List<Class<?>[]> parameterTypesList = serviceWrapper.getParamters();

        Class<?>[] parameterTypes = findMatchingParameterTypes(parameterTypesList, args);
        //反射获取方法执行返回值
        Object invokeResult = fastInvoke(targetCallObj, methodName, parameterTypes, args);

        ResultWrapper result = new ResultWrapper();
        result.setResult(invokeResult);
        ResponseCustomBody body = new ResponseCustomBody(Status.OK.value(), result);

        final RemotingTransporter response = RemotingTransporter.createResponseTransporter(UyuniProtocol.RPC_RESPONSE, body, request.getOpaque());

        channel.writeAndFlush(response).addListener((ChannelFutureListener) future -> {
            long elapsed = SystemClock.millisClock().now() - beginTime;
            if (future.isSuccess()) {
                //统计
                ServiceMeterManager.incrementTotalTime(serviceName, elapsed);
            } else {
                logger.info("request {} get failed response {}", request, response);
            }
        });

    }

    private void rejected(Status status, Channel channel, final RemotingTransporter request, String serviceName) {

        if (StringUtils.isNotBlank(serviceName)) {
            ServiceMeterManager.incrementFailTimes(serviceName);
        }
        ResultWrapper result = new ResultWrapper();
        switch (status) {
            case BAD_REQUEST:
                result.setError("bad request");
            case SERVICE_NOT_FOUND:
                result.setError(((RequestCustomBody) request.getCustomBody()).getServiceName() + " no service found");
                break;
            case APP_FLOW_CONTROL:
            case PROVIDER_FLOW_CONTROL:
                result.setError("over unit time call limit");
                break;
            default:
                logger.warn("Unexpected status.", status.description());
                return;
        }
        logger.warn("Service rejected: {}.", result.getError());

        ResponseCustomBody responseCustomBody = new ResponseCustomBody(status.value(), result);
        final RemotingTransporter response = RemotingTransporter.createResponseTransporter(UyuniProtocol.RPC_RESPONSE, responseCustomBody,
                request.getOpaque());

        channel.writeAndFlush(response).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                logger.info("request error {} get success response {}", request, response);
            } else {
                logger.info("request error {} get failed response {}", request, response);
            }
        });
    }

}
